package com.tl.spark.scala

import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.regionserver.SteppingSplitPolicy
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @program: spark-test
  * @description: 通过mapreduce的方式导入
  * @author: dong.tl
  * @create: 2018-09-13 17:00
  **/
object ScalaWritHbase {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setMaster("local").setAppName("ScalaWritHbase")
    val sc = new SparkContext(sparkConf);


//    sc.hadoopConfiguration.set("hbase.zookeeper.quorum ", "db01,db02,db03")
//    sc.hadoopConfiguration.set("zookeeper.znode.parent", "/hbase")
//    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, "png_tt")

    //Hbase设置
    val conf = HBaseConfiguration.create()
    //设置zooKeeper集群地址，也可以通过将hbase-site.xml导入classpath，但是建议在程序里这样设置
    conf.set(HConstants.ZOOKEEPER_QUORUM, "db01,db02,db03")
    //设置zookeeper连接端口，默认2181
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    //设置写入的表
    conf.set(TableOutputFormat.OUTPUT_TABLE, "test_put")

    var job = Job.getInstance(conf)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    var rdd1 = sc.makeRDD(Array(("AA", "BBBB"), ("BB", "6"), ("CC", "777")))
    rdd1.map(
      x => {
        var put = new Put(Bytes.toBytes(x._1))
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(x._2))
        (new ImmutableBytesWritable, put)
      }
    ).saveAsNewAPIHadoopDataset(job.getConfiguration)


    sc.stop()

  }
}
